<!DOCTYPE html>



  


<html class="theme-next muse use-motion" lang="zh-Hans">
<head>
  <meta charset="UTF-8"/>
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1"/>
<meta name="theme-color" content="#222">









<meta http-equiv="Cache-Control" content="no-transform" />
<meta http-equiv="Cache-Control" content="no-siteapp" />
















  
  
  <link href="/lib/fancybox/source/jquery.fancybox.css?v=2.1.5" rel="stylesheet" type="text/css" />







<link href="/lib/font-awesome/css/font-awesome.min.css?v=4.6.2" rel="stylesheet" type="text/css" />

<link href="/css/main.css?v=5.1.4" rel="stylesheet" type="text/css" />


  <link rel="apple-touch-icon" sizes="180x180" href="/images/apple-touch-icon-next.png?v=5.1.4">


  <link rel="icon" type="image/png" sizes="32x32" href="/images/favicon-32x32-next.png?v=5.1.4">


  <link rel="icon" type="image/png" sizes="16x16" href="/images/favicon-16x16-next.png?v=5.1.4">


  <link rel="mask-icon" href="/images/logo.svg?v=5.1.4" color="#222">





  <meta name="keywords" content="Hexo, NexT" />










<meta name="description" content="DLedger 基于 raft 协议，故天然支持主从切换，即主节点(Leader)发生故障，会重新触发选主，在集群内再选举出新的主节点。 RocketMQ 中主从同步，从节点不仅会从主节点同步数据，也会同步元数据，包含 topic 路由信息、消费进度、延迟队列处理队列、消费组订阅配置等信息。那主从切换后元数据如何同步呢？特别是主从切换过程中，对消息消费有多大的影响，会丢失消息吗？  温馨提示：本文">
<meta property="og:type" content="article">
<meta property="og:title" content="源码分析 RocketMQ DLedger 多副本即主从切换实现原理">
<meta property="og:url" content="https://www.codingw.net/posts/433e34a1.html">
<meta property="og:site_name" content="中间件兴趣圈">
<meta property="og:description" content="DLedger 基于 raft 协议，故天然支持主从切换，即主节点(Leader)发生故障，会重新触发选主，在集群内再选举出新的主节点。 RocketMQ 中主从同步，从节点不仅会从主节点同步数据，也会同步元数据，包含 topic 路由信息、消费进度、延迟队列处理队列、消费组订阅配置等信息。那主从切换后元数据如何同步呢？特别是主从切换过程中，对消息消费有多大的影响，会丢失消息吗？  温馨提示：本文">
<meta property="og:locale">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20191006174148582.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20191006174334194.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20191006175156723.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20191006175216215.png">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20191006175250767.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20191006175310688.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="article:published_time" content="2020-10-19T13:51:35.000Z">
<meta property="article:modified_time" content="2021-04-26T12:08:16.996Z">
<meta property="article:author" content="中间件兴趣圈">
<meta property="article:tag" content="中间件">
<meta name="twitter:card" content="summary">
<meta name="twitter:image" content="https://img-blog.csdnimg.cn/20191006174148582.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">



<script type="text/javascript" id="hexo.configurations">
  var NexT = window.NexT || {};
  var CONFIG = {
    root: '',
    scheme: 'Muse',
    version: '5.1.4',
    sidebar: {"position":"left","display":"post","offset":12,"b2t":false,"scrollpercent":false,"onmobile":false},
    fancybox: true,
    tabs: true,
    motion: {"enable":true,"async":false,"transition":{"post_block":"fadeIn","post_header":"slideDownIn","post_body":"slideDownIn","coll_header":"slideLeftIn","sidebar":"slideUpIn"}},
    duoshuo: {
      userId: '0',
      author: '博主'
    },
    algolia: {
      applicationID: '',
      apiKey: '',
      indexName: '',
      hits: {"per_page":10},
      labels: {"input_placeholder":"Search for Posts","hits_empty":"We didn't find any results for the search: ${query}","hits_stats":"${hits} results found in ${time} ms"}
    }
  };
</script>



  <link rel="canonical" href="https://www.codingw.net/posts/433e34a1.html"/>





  <title>源码分析 RocketMQ DLedger 多副本即主从切换实现原理 | 中间件兴趣圈</title>
  








<meta name="generator" content="Hexo 5.4.0"></head>

<body itemscope itemtype="http://schema.org/WebPage" lang="zh-Hans">

  
  
    
  

  <div class="container sidebar-position-left page-post-detail">
    <div class="headband"></div>

    <header id="header" class="header" itemscope itemtype="http://schema.org/WPHeader">
      <div class="header-inner"><div class="site-brand-wrapper">
  <div class="site-meta ">
    

    <div class="custom-logo-site-title">
      <a href="/"  class="brand" rel="start">
        <span class="logo-line-before"><i></i></span>
        <span class="site-title">中间件兴趣圈</span>
        <span class="logo-line-after"><i></i></span>
      </a>
    </div>
      
        <p class="site-subtitle">微信搜『中间件兴趣圈』，回复『Java』获取200本优质电子书</p>
      
  </div>

  <div class="site-nav-toggle">
    <button>
      <span class="btn-bar"></span>
      <span class="btn-bar"></span>
      <span class="btn-bar"></span>
    </button>
  </div>
</div>

<nav class="site-nav">
  

  
    <ul id="menu" class="menu">
      
        
        <li class="menu-item menu-item-home">
          <a href="/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-home"></i> <br />
            
            首页
          </a>
        </li>
      
        
        <li class="menu-item menu-item-categories">
          <a href="/categories/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-question-circle"></i> <br />
            
            分类
          </a>
        </li>
      
        
        <li class="menu-item menu-item-archives">
          <a href="/archives/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-question-circle"></i> <br />
            
            归档
          </a>
        </li>
      

      
    </ul>
  

  
</nav>



 </div>
    </header>

    <main id="main" class="main">
      <div class="main-inner">
        <div class="content-wrap">
          <div id="content" class="content">
            

  <div id="posts" class="posts-expand">
    

  

  
  
  

  <article class="post post-type-normal" itemscope itemtype="http://schema.org/Article">
  
  
  
  <div class="post-block">
    <link itemprop="mainEntityOfPage" href="https://www.codingw.net/posts/433e34a1.html">

    <span hidden itemprop="author" itemscope itemtype="http://schema.org/Person">
      <meta itemprop="name" content="">
      <meta itemprop="description" content="">
      <meta itemprop="image" content="/images/avatar.gif">
    </span>

    <span hidden itemprop="publisher" itemscope itemtype="http://schema.org/Organization">
      <meta itemprop="name" content="中间件兴趣圈">
    </span>

    
      <header class="post-header">

        
        
          <h1 class="post-title" itemprop="name headline">源码分析 RocketMQ DLedger 多副本即主从切换实现原理</h1>
        

        <div class="post-meta">
          <span class="post-time">
            
              <span class="post-meta-item-icon">
                <i class="fa fa-calendar-o"></i>
              </span>
              
                <span class="post-meta-item-text">发表于</span>
              
              <time title="创建于" itemprop="dateCreated datePublished" datetime="2020-10-19T21:51:35+08:00">
                2020-10-19
              </time>
            

            

            
          </span>

          
            <span class="post-category" >
            
              <span class="post-meta-divider">|</span>
            
              <span class="post-meta-item-icon">
                <i class="fa fa-folder-o"></i>
              </span>
              
                <span class="post-meta-item-text">分类于</span>
              
              
                <span itemprop="about" itemscope itemtype="http://schema.org/Thing">
                  <a href="/categories/rocketmq/" itemprop="url" rel="index">
                    <span itemprop="name">rocketmq</span>
                  </a>
                </span>

                
                
              
            </span>
          

          
            
          

          
          
             <span id="/posts/433e34a1.html" class="leancloud_visitors" data-flag-title="源码分析 RocketMQ DLedger 多副本即主从切换实现原理">
               <span class="post-meta-divider">|</span>
               <span class="post-meta-item-icon">
                 <i class="fa fa-eye"></i>
               </span>
               
                 <span class="post-meta-item-text">阅读次数&#58;</span>
               
                 <span class="leancloud-visitors-count"></span>
             </span>
          

          
            <span class="post-meta-divider">|</span>
            <span class="page-pv"><i class="fa fa-file-o"></i>
            <span class="busuanzi-value" id="busuanzi_value_page_pv" ></span>次
            </span>
          

          

          

        </div>
      </header>
    

    
    
    
    <div class="post-body" itemprop="articleBody">

      
      

      
        <div id="vip-container"><p>DLedger 基于 raft 协议，故天然支持主从切换，即主节点(Leader)发生故障，会重新触发选主，在集群内再选举出新的主节点。</p>
<p>RocketMQ 中主从同步，从节点不仅会从主节点同步数据，也会同步元数据，包含 topic 路由信息、消费进度、延迟队列处理队列、消费组订阅配置等信息。那主从切换后元数据如何同步呢？特别是主从切换过程中，对消息消费有多大的影响，会丢失消息吗？</p>
<blockquote>
<p>温馨提示：本文假设大家已经对 RocketMQ4.5 版本之前的主从同步实现有一定的了解，这部分内容在《RocketMQ技术内幕》一书中有详细的介绍，大家也可以参考如下两篇文章：<br>1、 <a target="_blank" rel="noopener" href="https://blog.csdn.net/prestigeding/article/details/93672079">RocketMQ HA机制(主从同步)</a> 。<br>2、<a target="_blank" rel="noopener" href="https://blog.csdn.net/prestigeding/article/details/101629440">RocketMQ 整合 DLedger(多副本)即主从切换实现平滑升级的设计技巧</a></p>
</blockquote>
<h2 id="1、BrokerController-中与主从相关的方法详解"><a href="#1、BrokerController-中与主从相关的方法详解" class="headerlink" title="1、BrokerController 中与主从相关的方法详解"></a>1、BrokerController 中与主从相关的方法详解</h2><p>本节先对 BrokerController 中与主从切换相关的方法。</p>
<h3 id="1-1-startProcessorByHa"><a href="#1-1-startProcessorByHa" class="headerlink" title="1.1 startProcessorByHa"></a>1.1 startProcessorByHa</h3><p>BrokerController#startProcessorByHa</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">private</span> <span class="keyword">void</span> <span class="title">startProcessorByHa</span><span class="params">(BrokerRole role)</span> </span>&#123;</span><br><span class="line">    <span class="keyword">if</span> (BrokerRole.SLAVE != role) &#123;</span><br><span class="line">        <span class="keyword">if</span> (<span class="keyword">this</span>.transactionalMessageCheckService != <span class="keyword">null</span>) &#123;</span><br><span class="line">            <span class="keyword">this</span>.transactionalMessageCheckService.start();</span><br><span class="line">        &#125;</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>感觉该方法的取名较为随意，该方法的作用是开启事务状态回查处理器，即当节点为主节点时，开启对应的事务状态回查处理器，对PREPARE状态的消息发起事务状态回查请求。</p>
<h3 id="1-2-shutdownProcessorByHa"><a href="#1-2-shutdownProcessorByHa" class="headerlink" title="1.2 shutdownProcessorByHa"></a>1.2 shutdownProcessorByHa</h3><p>BrokerController#shutdownProcessorByHa</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">private</span> <span class="keyword">void</span> <span class="title">shutdownProcessorByHa</span><span class="params">()</span> </span>&#123;</span><br><span class="line">    <span class="keyword">if</span> (<span class="keyword">this</span>.transactionalMessageCheckService != <span class="keyword">null</span>) &#123;</span><br><span class="line">        <span class="keyword">this</span>.transactionalMessageCheckService.shutdown(<span class="keyword">true</span>);</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>关闭事务状态回查处理器，当节点从主节点变更为从节点后，该方法被调用。</p>
<h3 id="1-3-handleSlaveSynchronize"><a href="#1-3-handleSlaveSynchronize" class="headerlink" title="1.3 handleSlaveSynchronize"></a>1.3 handleSlaveSynchronize</h3><p>BrokerController#handleSlaveSynchronize</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">private</span> <span class="keyword">void</span> <span class="title">handleSlaveSynchronize</span><span class="params">(BrokerRole role)</span> </span>&#123;</span><br><span class="line">    <span class="keyword">if</span> (role == BrokerRole.SLAVE) &#123;   <span class="comment">// @1</span></span><br><span class="line">        <span class="keyword">if</span> (<span class="keyword">null</span> != slaveSyncFuture) &#123;   </span><br><span class="line">            slaveSyncFuture.cancel(<span class="keyword">false</span>);</span><br><span class="line">        &#125;</span><br><span class="line">        <span class="keyword">this</span>.slaveSynchronize.setMasterAddr(<span class="keyword">null</span>);   <span class="comment">// </span></span><br><span class="line">        slaveSyncFuture = <span class="keyword">this</span>.scheduledExecutorService.scheduleAtFixedRate(<span class="keyword">new</span> Runnable() &#123;</span><br><span class="line">            <span class="meta">@Override</span></span><br><span class="line">            <span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">run</span><span class="params">()</span> </span>&#123;</span><br><span class="line">                <span class="keyword">try</span> &#123;</span><br><span class="line">                    BrokerController.<span class="keyword">this</span>.slaveSynchronize.syncAll();</span><br><span class="line">                &#125; <span class="keyword">catch</span> (Throwable e) &#123;</span><br><span class="line">                    log.error(<span class="string">&quot;ScheduledTask SlaveSynchronize syncAll error.&quot;</span>, e);</span><br><span class="line">                &#125;</span><br><span class="line">            &#125;</span><br><span class="line">        &#125;, <span class="number">1000</span> * <span class="number">3</span>, <span class="number">1000</span> * <span class="number">10</span>, TimeUnit.MILLISECONDS);</span><br><span class="line">    &#125; <span class="keyword">else</span> &#123;  <span class="comment">// @2</span></span><br><span class="line">        <span class="comment">//handle the slave synchronise</span></span><br><span class="line">        <span class="keyword">if</span> (<span class="keyword">null</span> != slaveSyncFuture) &#123;</span><br><span class="line">            slaveSyncFuture.cancel(<span class="keyword">false</span>);</span><br><span class="line">        &#125;</span><br><span class="line">        <span class="keyword">this</span>.slaveSynchronize.setMasterAddr(<span class="keyword">null</span>);</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>该方法的主要作用是处理从节点的元数据同步，即从节点向主节点主动同步 topic 的路由信息、消费进度、延迟队列处理队列、消费组订阅配置等信息。</p>
<p>代码@1：如果当前节点的角色为从节点：</p>
<ul>
<li>如果上次同步的 future 不为空，则首先先取消。</li>
<li>然后设置 slaveSynchronize 的 master 地址为空。不知大家是否与笔者一样，有一个疑问，从节点的时候，如果将 master 地址设置为空，那如何同步元数据，那这个值会在什么时候设置呢？</li>
<li>开启定时同步任务，每 10s 从主节点同步一次元数据。 </li>
</ul>
<p>代码@2：如果当前节点的角色为主节点，则取消定时同步任务并设置 master 的地址为空。</p>
<h3 id="1-4-changeToSlave"><a href="#1-4-changeToSlave" class="headerlink" title="1.4 changeToSlave"></a>1.4 changeToSlave</h3><p>BrokerController#changeToSlave</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">changeToSlave</span><span class="params">(<span class="keyword">int</span> brokerId)</span> </span>&#123;</span><br><span class="line">    log.info(<span class="string">&quot;Begin to change to slave brokerName=&#123;&#125; brokerId=&#123;&#125;&quot;</span>, brokerConfig.getBrokerName(), brokerId);</span><br><span class="line">    <span class="comment">//change the role</span></span><br><span class="line">    brokerConfig.setBrokerId(brokerId == <span class="number">0</span> ? <span class="number">1</span> : brokerId); <span class="comment">//TO DO check       // @1</span></span><br><span class="line">    messageStoreConfig.setBrokerRole(BrokerRole.SLAVE);                            <span class="comment">// @2</span></span><br><span class="line">    <span class="comment">//handle the scheduled service</span></span><br><span class="line">    <span class="keyword">try</span> &#123;</span><br><span class="line">        <span class="keyword">this</span>.messageStore.handleScheduleMessageService(BrokerRole.SLAVE);    <span class="comment">//  @3</span></span><br><span class="line">    &#125; <span class="keyword">catch</span> (Throwable t) &#123;</span><br><span class="line">        log.error(<span class="string">&quot;[MONITOR] handleScheduleMessageService failed when changing to slave&quot;</span>, t);</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="comment">//handle the transactional service</span></span><br><span class="line">    <span class="keyword">try</span> &#123;</span><br><span class="line">        <span class="keyword">this</span>.shutdownProcessorByHa();                                                                    <span class="comment">//  @4</span></span><br><span class="line">    &#125; <span class="keyword">catch</span> (Throwable t) &#123;</span><br><span class="line">        log.error(<span class="string">&quot;[MONITOR] shutdownProcessorByHa failed when changing to slave&quot;</span>, t);</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="comment">//handle the slave synchronise</span></span><br><span class="line">    handleSlaveSynchronize(BrokerRole.SLAVE);                                               <span class="comment">// @5</span></span><br><span class="line">    <span class="keyword">try</span> &#123;</span><br><span class="line">        <span class="keyword">this</span>.registerBrokerAll(<span class="keyword">true</span>, <span class="keyword">true</span>, brokerConfig.isForceRegister());              <span class="comment">// @6</span></span><br><span class="line">    &#125; <span class="keyword">catch</span> (Throwable ignored) &#123;</span><br><span class="line">    &#125;</span><br><span class="line">    log.info(<span class="string">&quot;Finish to change to slave brokerName=&#123;&#125; brokerId=&#123;&#125;&quot;</span>, brokerConfig.getBrokerName(), brokerId);</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>Broker 状态变更为从节点。其关键实现如下：</p>
<ul>
<li>设置 brokerId，如果broker的id为0，则设置为1，这里在使用的时候，注意规划好集群内节点的 brokerId。</li>
<li>设置 broker  的状态为 BrokerRole.SLAVE。</li>
<li>如果是从节点，则关闭定时调度线程(处理 RocketMQ 延迟队列)，如果是主节点，则启动该线程。</li>
<li>关闭事务状态回查处理器。</li>
<li>从节点需要启动元数据同步处理器，即启动 SlaveSynchronize 定时从主服务器同步元数据。</li>
<li>立即向集群内所有的 nameserver 告知 broker  信息状态的变更。</li>
</ul>
<h3 id="1-5-changeToMaster"><a href="#1-5-changeToMaster" class="headerlink" title="1.5 changeToMaster"></a>1.5 changeToMaster</h3><p>BrokerController#changeToMaster</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">changeToMaster</span><span class="params">(BrokerRole role)</span> </span>&#123;</span><br><span class="line">    <span class="keyword">if</span> (role == BrokerRole.SLAVE) &#123;</span><br><span class="line">        <span class="keyword">return</span>;</span><br><span class="line">    &#125;</span><br><span class="line">    log.info(<span class="string">&quot;Begin to change to master brokerName=&#123;&#125;&quot;</span>, brokerConfig.getBrokerName());</span><br><span class="line">    <span class="comment">//handle the slave synchronise</span></span><br><span class="line">    handleSlaveSynchronize(role);   <span class="comment">// @1</span></span><br><span class="line">    <span class="comment">//handle the scheduled service</span></span><br><span class="line">    <span class="keyword">try</span> &#123;</span><br><span class="line">        <span class="keyword">this</span>.messageStore.handleScheduleMessageService(role);      <span class="comment">// @2</span></span><br><span class="line">    &#125; <span class="keyword">catch</span> (Throwable t) &#123;</span><br><span class="line">        log.error(<span class="string">&quot;[MONITOR] handleScheduleMessageService failed when changing to master&quot;</span>, t);</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="comment">//handle the transactional service</span></span><br><span class="line">    <span class="keyword">try</span> &#123;</span><br><span class="line">        <span class="keyword">this</span>.startProcessorByHa(BrokerRole.SYNC_MASTER);         <span class="comment">// @3</span></span><br><span class="line">    &#125; <span class="keyword">catch</span> (Throwable t) &#123;</span><br><span class="line">        log.error(<span class="string">&quot;[MONITOR] startProcessorByHa failed when changing to master&quot;</span>, t);</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="comment">//if the operations above are totally successful, we change to master</span></span><br><span class="line">    brokerConfig.setBrokerId(<span class="number">0</span>); <span class="comment">//TO DO check                              // @4</span></span><br><span class="line">    messageStoreConfig.setBrokerRole(role);                               </span><br><span class="line">    <span class="keyword">try</span> &#123;</span><br><span class="line">        <span class="keyword">this</span>.registerBrokerAll(<span class="keyword">true</span>, <span class="keyword">true</span>, brokerConfig.isForceRegister()); <span class="comment">// @5</span></span><br><span class="line">    &#125; <span class="keyword">catch</span> (Throwable ignored) &#123;</span><br><span class="line">    &#125;</span><br><span class="line">    log.info(<span class="string">&quot;Finish to change to master brokerName=&#123;&#125;&quot;</span>, brokerConfig.getBrokerName());</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>该方法是 Broker 角色从从节点变更为主节点的处理逻辑，其实现要点如下：</p>
<ul>
<li>关闭元数据同步器，因为主节点无需同步。</li>
<li>开启定时任务处理线程。</li>
<li>开启事务状态回查处理线程。</li>
<li>设置 brokerId 为 0。</li>
<li>向 nameserver 立即发送心跳包以便告知 broker 服务器当前最新的状态。</li>
</ul>
<p>主从节点状态变更的核心方法就介绍到这里了，接下来看看如何触发主从切换。</p>
<h2 id="2、如何触发主从切换"><a href="#2、如何触发主从切换" class="headerlink" title="2、如何触发主从切换"></a>2、如何触发主从切换</h2><p>从前面的文章我们可以得知，RocketMQ DLedger 是基于 raft 协议实现的，在该协议中就实现了主节点的选举与主节点失效后集群会自动进行重新选举，经过协商投票产生新的主节点，从而实现高可用。</p>
<p>BrokerController#initialize</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">if</span> (messageStoreConfig.isEnableDLegerCommitLog()) &#123;</span><br><span class="line">    DLedgerRoleChangeHandler roleChangeHandler = <span class="keyword">new</span> DLedgerRoleChangeHandler(<span class="keyword">this</span>, (DefaultMessageStore) messageStore);</span><br><span class="line">    ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>上述代码片段截取自 BrokerController 的 initialize 方法，我们可以得知在 Broker 启动时，如果开启了 多副本机制，即 enableDLedgerCommitLog 参数设置为 true，会为 集群节点选主器添加 roleChangeHandler 事件处理器，即节点发送变更后的事件处理器。</p>
<span id="more"></span>

<p>接下来我们将重点探讨 DLedgerRoleChangeHandler 。</p>
<h3 id="2-1-类图"><a href="#2-1-类图" class="headerlink" title="2.1 类图"></a>2.1 类图</h3><p><img src="https://img-blog.csdnimg.cn/20191006174148582.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>DLedgerRoleChangeHandler 继承自 RoleChangeHandler，即节点状态发生变更后的事件处理器。上述的属性都很简单，在这里就重点介绍一下 ExecutorService executorService，事件处理线程池，但只会开启一个线程，故事件将一个一个按顺序执行。</p>
<p>接下来我们来重点看一下 handle 方法的执行。</p>
<h3 id="2-2-handle-主从状态切换处理逻辑"><a href="#2-2-handle-主从状态切换处理逻辑" class="headerlink" title="2.2 handle 主从状态切换处理逻辑"></a>2.2 handle 主从状态切换处理逻辑</h3><p>DLedgerRoleChangeHandler#handle</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br><span class="line">39</span><br><span class="line">40</span><br><span class="line">41</span><br><span class="line">42</span><br><span class="line">43</span><br><span class="line">44</span><br><span class="line">45</span><br><span class="line">46</span><br><span class="line">47</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">handle</span><span class="params">(<span class="keyword">long</span> term, MemberState.Role role)</span> </span>&#123;</span><br><span class="line">    Runnable runnable = <span class="keyword">new</span> Runnable() &#123;</span><br><span class="line">        <span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">run</span><span class="params">()</span> </span>&#123;</span><br><span class="line">            <span class="keyword">long</span> start = System.currentTimeMillis();</span><br><span class="line">            <span class="keyword">try</span> &#123;</span><br><span class="line">                <span class="keyword">boolean</span> succ = <span class="keyword">true</span>;</span><br><span class="line">                log.info(<span class="string">&quot;Begin handling broker role change term=&#123;&#125; role=&#123;&#125; currStoreRole=&#123;&#125;&quot;</span>, term, role, messageStore.getMessageStoreConfig().getBrokerRole());</span><br><span class="line">                <span class="keyword">switch</span> (role) &#123;</span><br><span class="line">                    <span class="keyword">case</span> CANDIDATE:    <span class="comment">// @1</span></span><br><span class="line">                        <span class="keyword">if</span> (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) &#123;</span><br><span class="line">                            brokerController.changeToSlave(dLedgerCommitLog.getId());</span><br><span class="line">                        &#125;</span><br><span class="line">                        <span class="keyword">break</span>;</span><br><span class="line">                    <span class="keyword">case</span> FOLLOWER:         <span class="comment">// @2</span></span><br><span class="line">                        brokerController.changeToSlave(dLedgerCommitLog.getId());</span><br><span class="line">                        <span class="keyword">break</span>;</span><br><span class="line">                    <span class="keyword">case</span> LEADER:           <span class="comment">// @3</span></span><br><span class="line">                        <span class="keyword">while</span> (<span class="keyword">true</span>) &#123;</span><br><span class="line">                            <span class="keyword">if</span> (!dLegerServer.getMemberState().isLeader()) &#123;</span><br><span class="line">                                succ = <span class="keyword">false</span>;</span><br><span class="line">                                <span class="keyword">break</span>;</span><br><span class="line">                            &#125;</span><br><span class="line">                            <span class="keyword">if</span> (dLegerServer.getdLedgerStore().getLedgerEndIndex() == -<span class="number">1</span>) &#123;</span><br><span class="line">                                <span class="keyword">break</span>;</span><br><span class="line">                            &#125;</span><br><span class="line">                            <span class="keyword">if</span> (dLegerServer.getdLedgerStore().getLedgerEndIndex() == dLegerServer.getdLedgerStore().getCommittedIndex()</span><br><span class="line">                                &amp;&amp; messageStore.dispatchBehindBytes() == <span class="number">0</span>) &#123;</span><br><span class="line">                                <span class="keyword">break</span>;</span><br><span class="line">                            &#125;</span><br><span class="line">                            Thread.sleep(<span class="number">100</span>);</span><br><span class="line">                        &#125;</span><br><span class="line">                        <span class="keyword">if</span> (succ) &#123;</span><br><span class="line">                            messageStore.recoverTopicQueueTable();</span><br><span class="line">                            brokerController.changeToMaster(BrokerRole.SYNC_MASTER);</span><br><span class="line">                        &#125;</span><br><span class="line">                        <span class="keyword">break</span>;</span><br><span class="line">                    <span class="keyword">default</span>:</span><br><span class="line">                        <span class="keyword">break</span>;</span><br><span class="line">                &#125;</span><br><span class="line">                log.info(<span class="string">&quot;Finish handling broker role change succ=&#123;&#125; term=&#123;&#125; role=&#123;&#125; currStoreRole=&#123;&#125; cost=&#123;&#125;&quot;</span>, succ, term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start));</span><br><span class="line">            &#125; <span class="keyword">catch</span> (Throwable t) &#123;</span><br><span class="line">                log.info(<span class="string">&quot;[MONITOR]Failed handling broker role change term=&#123;&#125; role=&#123;&#125; currStoreRole=&#123;&#125; cost=&#123;&#125;&quot;</span>, term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start), t);</span><br><span class="line">            &#125;</span><br><span class="line">        &#125;</span><br><span class="line">    &#125;;</span><br><span class="line">    executorService.submit(runnable);</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>代码@1：如果当前节点状态机状态为 CANDIDATE，表示正在发起 Leader 节点，如果该服务器的角色不是 SLAVE 的话，需要将状态切换为 SLAVE。</p>
<p>代码@2：如果当前节点状态机状态为 FOLLOWER，broker 节点将转换为 从节点。</p>
<p>代码@3：如果当前节点状态机状态为 Leader，说明该节点被选举为 Leader，在切换到 Master 节点之前，首先需要等待当前节点追加的数据都已经被提交后才可以将状态变更为 Master，其关键实现如下：</p>
<ul>
<li>如果 ledgerEndIndex 为 -1，表示当前节点还未又数据转发，直接跳出循环，无需等待。</li>
<li>如果 ledgerEndIndex 不为 -1 ，则必须等待数据都已提交，即 ledgerEndIndex 与 committedIndex 相等。</li>
<li>并且需要等待  commitlog 日志全部已转发到 consumequeue中，即 ReputMessageService 中的 reputFromOffset 与 commitlog 的 maxOffset 相等。</li>
</ul>
<p>等待上述条件满足后，即可以进行状态的变更，需要恢复 ConsumeQueue，维护每一个 queue 对应的 maxOffset，然后将 broker 角色转变为 master。</p>
<p>经过上面的步骤，就能实时完成 broker 主节点的自动切换。由于单从代码的角度来看主从切换不够直观，下面我将给出主从切换的流程图。</p>
<h3 id="2-3-主从切换流程图"><a href="#2-3-主从切换流程图" class="headerlink" title="2.3 主从切换流程图"></a>2.3 主从切换流程图</h3><p>由于从源码的角度或许不够直观，故本节给出其流程图。</p>
<blockquote>
<p>温馨提示：该流程图的前半部分在 源码分析 RocketMQ 整合 DLedger(多副本)实现平滑升级的设计技巧 该文中有所阐述。</p>
</blockquote>
<p><img src="https://img-blog.csdnimg.cn/20191006174334194.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"></p>
<h2 id="3、主从切换若干问题思考"><a href="#3、主从切换若干问题思考" class="headerlink" title="3、主从切换若干问题思考"></a>3、主从切换若干问题思考</h2><p>我相信经过上面的讲解，大家应该对主从切换的实现原理有了一个比较清晰的理解，我更相信读者朋友们会抛出一个疑问，主从切换会不会丢失消息，消息消费进度是否会丢失而导致重复消费呢？</p>
<h3 id="3-1-消息消费进度是否存在丢失风险"><a href="#3-1-消息消费进度是否存在丢失风险" class="headerlink" title="3.1 消息消费进度是否存在丢失风险"></a>3.1 消息消费进度是否存在丢失风险</h3><p>首先，由于 RocketMQ 元数据，当然也包含消息消费进度的同步是采用的从服务器定时向主服务器拉取进行更新，存在时延，引入 DLedger 机制，也并不保证其一致性，DLedger 只保证 commitlog 文件的一致性。</p>
<p>当主节点宕机后，各个从节点并不会完成同步了消息消费进度，于此同时，消息消费继续，此时消费者会继续从从节点拉取消息进行消费，但汇报的从节点并不一定会成为新的主节点，故消费进度在 broker 端存在丢失的可能性。当然并不是一定会丢失，因为消息消费端只要不重启，消息消费进度会存储在内存中。</p>
<p>综合所述，消息消费进度在 broker  端会有丢失的可能性，存在重复消费的可能性，不过问题不大，因为 RocketMQ 本身也不承若不会重复消费。</p>
<h3 id="3-2-消息是否存在丢失风险"><a href="#3-2-消息是否存在丢失风险" class="headerlink" title="3.2 消息是否存在丢失风险"></a>3.2 消息是否存在丢失风险</h3><p>消息会不会丢失的关键在于，日志复制进度较慢的从节点是否可以被选举为主节点，如果在一个集群中，从节点的复制进度落后与从主节点，但当主节点宕机后，如果该从节点被选举成为新的主节点，那这将是一个灾难，将会丢失数据。关于一个节点是否给另外一个节点投赞成票的逻辑在 <a target="_blank" rel="noopener" href="https://blog.csdn.net/prestigeding/article/details/99697323">源码分析 RocketMQ DLedger 多副本之 Leader 选主</a> 的 2.4.2 handleVote 方法中已详细介绍，在这里我以截图的方式再展示其核心点：<br><img src="https://img-blog.csdnimg.cn/20191006175156723.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br><img src="https://img-blog.csdnimg.cn/20191006175216215.png" alt="在这里插入图片描述"><br>从上面可以得知，如果发起投票节点的复制进度比自己小的话，会投拒绝票。其<br><img src="https://img-blog.csdnimg.cn/20191006175250767.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br><img src="https://img-blog.csdnimg.cn/20191006175310688.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>必须得到集群内超过半数节点认可，即最终选举出来的主节点的当前复制进度一定是比绝大多数的从节点要大，并且也会等于承偌给客户端的已提交偏移量。故得出的结论是不会丢消息。</p>
<p>本文的介绍就到此为止了，最后抛出一个思考题与大家相互交流学习，也算是对 DLedger 多副本即主从切换一个总结回顾。答案我会以留言的方式或在下一篇文章中给出。</p>
<h2 id="4、思考题"><a href="#4、思考题" class="headerlink" title="4、思考题"></a>4、思考题</h2><p>例如一个集群内有5个节点的 DLedgr 集群。<br>Leader Node:  n0-broker-a<br>folloer Node:   n1-broker-a,n2-broker-a,n3-broker-a,n4-broker-a</p>
<p>从节点的复制进度可能不一致，例如：<br>n1-broker-a复制进度为 100<br>n2-broker-a复制进度为 120<br>n3-broker-a复制进度为 90<br>n4-broker-a负载进度为 90</p>
<p>如果此时 n0-broker-a 节点宕机，触发选主，如果  n1率先发起投票，由于 n1,的复制进度大于 n3,n4，再加上自己一票，是有可能成为leader的，此时消息会丢失吗？为什么？</p>
</div>

			<script src="https://my.openwrite.cn/js/readmore.js" type="text/javascript"></script>
			<script>
			var isMobile = navigator.userAgent.match(/(phone|pad|pod|iPhone|iPod|ios|iPad|Android|Mobile|BlackBerry|IEMobile|MQQBrowser|JUC|Fennec|wOSBrowser|BrowserNG|WebOS|Symbian|Windows Phone)/i);
			if (!isMobile) {
			    var btw = new BTWPlugin();
			    btw.init({
			        "id": "vip-container",
			        "blogId": "18019-1573088808868-542",
			        "name": "中间件兴趣圈",
			        "qrcode": "https://img-blog.csdnimg.cn/20190314214003962.jpg",
			        "keyword": "more"
			    });
			}
			</script>
		
      
    </div>
    
    
    

    

    

    

    <footer class="post-footer">
      

      
      
      

      
        <div class="post-nav">
          <div class="post-nav-next post-nav-item">
            
              <a href="/posts/91680207.html" rel="next" title="源码分析 RocketMQ DLedger(多副本) 之日志复制(传播)">
                <i class="fa fa-chevron-left"></i> 源码分析 RocketMQ DLedger(多副本) 之日志复制(传播)
              </a>
            
          </div>

          <span class="post-nav-divider"></span>

          <div class="post-nav-prev post-nav-item">
            
              <a href="/posts/f4f9d0d8.html" rel="prev" title="源码分析 RateLimiter SmoothBursty 实现原理(文末附流程图)">
                源码分析 RateLimiter SmoothBursty 实现原理(文末附流程图) <i class="fa fa-chevron-right"></i>
              </a>
            
          </div>
        </div>
      

      
      
    </footer>
  </div>
  
  
  
  </article>



    <div class="post-spread">
      
    </div>
  </div>


          </div>
          


          

  



        </div>
        
          
  
  <div class="sidebar-toggle">
    <div class="sidebar-toggle-line-wrap">
      <span class="sidebar-toggle-line sidebar-toggle-line-first"></span>
      <span class="sidebar-toggle-line sidebar-toggle-line-middle"></span>
      <span class="sidebar-toggle-line sidebar-toggle-line-last"></span>
    </div>
  </div>

  <aside id="sidebar" class="sidebar">
    
    <div class="sidebar-inner">

      

      
        <ul class="sidebar-nav motion-element">
          <li class="sidebar-nav-toc sidebar-nav-active" data-target="post-toc-wrap">
            文章目录
          </li>
          <li class="sidebar-nav-overview" data-target="site-overview-wrap">
            站点概览
          </li>
        </ul>
      

      <section class="site-overview-wrap sidebar-panel">
        <div class="site-overview">
          <div class="site-author motion-element" itemprop="author" itemscope itemtype="http://schema.org/Person">
            
              <p class="site-author-name" itemprop="name"></p>
              <p class="site-description motion-element" itemprop="description"></p>
          </div>

          <nav class="site-state motion-element">

            
              <div class="site-state-item site-state-posts">
              
                <a href="/archives/">
              
                  <span class="site-state-item-count">139</span>
                  <span class="site-state-item-name">日志</span>
                </a>
              </div>
            

            
              
              
              <div class="site-state-item site-state-categories">
                <a href="/categories/index.html">
                  <span class="site-state-item-count">18</span>
                  <span class="site-state-item-name">分类</span>
                </a>
              </div>
            

            

          </nav>

          

          

          
          

          
          

          

        </div>
      </section>

      
      <!--noindex-->
        <section class="post-toc-wrap motion-element sidebar-panel sidebar-panel-active">
          <div class="post-toc">

            
              
            

            
              <div class="post-toc-content"><ol class="nav"><li class="nav-item nav-level-2"><a class="nav-link" href="#1%E3%80%81BrokerController-%E4%B8%AD%E4%B8%8E%E4%B8%BB%E4%BB%8E%E7%9B%B8%E5%85%B3%E7%9A%84%E6%96%B9%E6%B3%95%E8%AF%A6%E8%A7%A3"><span class="nav-number">1.</span> <span class="nav-text">1、BrokerController 中与主从相关的方法详解</span></a><ol class="nav-child"><li class="nav-item nav-level-3"><a class="nav-link" href="#1-1-startProcessorByHa"><span class="nav-number">1.1.</span> <span class="nav-text">1.1 startProcessorByHa</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#1-2-shutdownProcessorByHa"><span class="nav-number">1.2.</span> <span class="nav-text">1.2 shutdownProcessorByHa</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#1-3-handleSlaveSynchronize"><span class="nav-number">1.3.</span> <span class="nav-text">1.3 handleSlaveSynchronize</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#1-4-changeToSlave"><span class="nav-number">1.4.</span> <span class="nav-text">1.4 changeToSlave</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#1-5-changeToMaster"><span class="nav-number">1.5.</span> <span class="nav-text">1.5 changeToMaster</span></a></li></ol></li><li class="nav-item nav-level-2"><a class="nav-link" href="#2%E3%80%81%E5%A6%82%E4%BD%95%E8%A7%A6%E5%8F%91%E4%B8%BB%E4%BB%8E%E5%88%87%E6%8D%A2"><span class="nav-number">2.</span> <span class="nav-text">2、如何触发主从切换</span></a><ol class="nav-child"><li class="nav-item nav-level-3"><a class="nav-link" href="#2-1-%E7%B1%BB%E5%9B%BE"><span class="nav-number">2.1.</span> <span class="nav-text">2.1 类图</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#2-2-handle-%E4%B8%BB%E4%BB%8E%E7%8A%B6%E6%80%81%E5%88%87%E6%8D%A2%E5%A4%84%E7%90%86%E9%80%BB%E8%BE%91"><span class="nav-number">2.2.</span> <span class="nav-text">2.2 handle 主从状态切换处理逻辑</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#2-3-%E4%B8%BB%E4%BB%8E%E5%88%87%E6%8D%A2%E6%B5%81%E7%A8%8B%E5%9B%BE"><span class="nav-number">2.3.</span> <span class="nav-text">2.3 主从切换流程图</span></a></li></ol></li><li class="nav-item nav-level-2"><a class="nav-link" href="#3%E3%80%81%E4%B8%BB%E4%BB%8E%E5%88%87%E6%8D%A2%E8%8B%A5%E5%B9%B2%E9%97%AE%E9%A2%98%E6%80%9D%E8%80%83"><span class="nav-number">3.</span> <span class="nav-text">3、主从切换若干问题思考</span></a><ol class="nav-child"><li class="nav-item nav-level-3"><a class="nav-link" href="#3-1-%E6%B6%88%E6%81%AF%E6%B6%88%E8%B4%B9%E8%BF%9B%E5%BA%A6%E6%98%AF%E5%90%A6%E5%AD%98%E5%9C%A8%E4%B8%A2%E5%A4%B1%E9%A3%8E%E9%99%A9"><span class="nav-number">3.1.</span> <span class="nav-text">3.1 消息消费进度是否存在丢失风险</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#3-2-%E6%B6%88%E6%81%AF%E6%98%AF%E5%90%A6%E5%AD%98%E5%9C%A8%E4%B8%A2%E5%A4%B1%E9%A3%8E%E9%99%A9"><span class="nav-number">3.2.</span> <span class="nav-text">3.2 消息是否存在丢失风险</span></a></li></ol></li><li class="nav-item nav-level-2"><a class="nav-link" href="#4%E3%80%81%E6%80%9D%E8%80%83%E9%A2%98"><span class="nav-number">4.</span> <span class="nav-text">4、思考题</span></a></li></ol></div>
            

          </div>
        </section>
      <!--/noindex-->
      

      

    </div>
  </aside>


        
      </div>
    </main>

    <footer id="footer" class="footer">
      <div class="footer-inner">
        <div class="copyright">&copy; <span itemprop="copyrightYear">2021</span>
  <span class="with-love">
    <i class="fa fa-user"></i>
  </span>
  <span class="author" itemprop="copyrightHolder">中间件兴趣圈</span>

  
</div>


  <div class="powered-by">由 <a class="theme-link" target="_blank" href="https://hexo.io">Hexo</a> 强力驱动</div>



  <span class="post-meta-divider">|</span>



  <div class="theme-info">主题 &mdash; <a class="theme-link" target="_blank" href="https://github.com/iissnan/hexo-theme-next">NexT.Muse</a> v5.1.4</div>




        
<div class="busuanzi-count">
  <script async src="https://dn-lbstatics.qbox.me/busuanzi/2.3/busuanzi.pure.mini.js"></script>

  
    <span class="site-uv">
      <i class="fa fa-user"></i>
      <span class="busuanzi-value" id="busuanzi_value_site_uv"></span>
      
    </span>
  

  
    <span class="site-pv">
      <i class="fa fa-eye"></i>
      <span class="busuanzi-value" id="busuanzi_value_site_pv"></span>
      
    </span>
  
</div>








        
      </div>
    </footer>

    
      <div class="back-to-top">
        <i class="fa fa-arrow-up"></i>
        
      </div>
    

    

  </div>

  

<script type="text/javascript">
  if (Object.prototype.toString.call(window.Promise) !== '[object Function]') {
    window.Promise = null;
  }
</script>









  












  
  
    <script type="text/javascript" src="/lib/jquery/index.js?v=2.1.3"></script>
  

  
  
    <script type="text/javascript" src="/lib/fastclick/lib/fastclick.min.js?v=1.0.6"></script>
  

  
  
    <script type="text/javascript" src="/lib/jquery_lazyload/jquery.lazyload.js?v=1.9.7"></script>
  

  
  
    <script type="text/javascript" src="/lib/velocity/velocity.min.js?v=1.2.1"></script>
  

  
  
    <script type="text/javascript" src="/lib/velocity/velocity.ui.min.js?v=1.2.1"></script>
  

  
  
    <script type="text/javascript" src="/lib/fancybox/source/jquery.fancybox.pack.js?v=2.1.5"></script>
  


  


  <script type="text/javascript" src="/js/src/utils.js?v=5.1.4"></script>

  <script type="text/javascript" src="/js/src/motion.js?v=5.1.4"></script>



  
  

  
  <script type="text/javascript" src="/js/src/scrollspy.js?v=5.1.4"></script>
<script type="text/javascript" src="/js/src/post-details.js?v=5.1.4"></script>



  


  <script type="text/javascript" src="/js/src/bootstrap.js?v=5.1.4"></script>



  


  




	





  





  












  





  

  
  <script src="https://cdn1.lncld.net/static/js/av-core-mini-0.6.4.js"></script>
  <script>AV.initialize("NNEhOL0iOcflg8f1U3HUqiCq-gzGzoHsz", "7kSmkbbb3DktmHALlShDsBUF");</script>
  <script>
    function showTime(Counter) {
      var query = new AV.Query(Counter);
      var entries = [];
      var $visitors = $(".leancloud_visitors");

      $visitors.each(function () {
        entries.push( $(this).attr("id").trim() );
      });

      query.containedIn('url', entries);
      query.find()
        .done(function (results) {
          var COUNT_CONTAINER_REF = '.leancloud-visitors-count';

          if (results.length === 0) {
            $visitors.find(COUNT_CONTAINER_REF).text(0);
            return;
          }

          for (var i = 0; i < results.length; i++) {
            var item = results[i];
            var url = item.get('url');
            var time = item.get('time');
            var element = document.getElementById(url);

            $(element).find(COUNT_CONTAINER_REF).text(time);
          }
          for(var i = 0; i < entries.length; i++) {
            var url = entries[i];
            var element = document.getElementById(url);
            var countSpan = $(element).find(COUNT_CONTAINER_REF);
            if( countSpan.text() == '') {
              countSpan.text(0);
            }
          }
        })
        .fail(function (object, error) {
          console.log("Error: " + error.code + " " + error.message);
        });
    }

    function addCount(Counter) {
      var $visitors = $(".leancloud_visitors");
      var url = $visitors.attr('id').trim();
      var title = $visitors.attr('data-flag-title').trim();
      var query = new AV.Query(Counter);

      query.equalTo("url", url);
      query.find({
        success: function(results) {
          if (results.length > 0) {
            var counter = results[0];
            counter.fetchWhenSave(true);
            counter.increment("time");
            counter.save(null, {
              success: function(counter) {
                var $element = $(document.getElementById(url));
                $element.find('.leancloud-visitors-count').text(counter.get('time'));
              },
              error: function(counter, error) {
                console.log('Failed to save Visitor num, with error message: ' + error.message);
              }
            });
          } else {
            var newcounter = new Counter();
            /* Set ACL */
            var acl = new AV.ACL();
            acl.setPublicReadAccess(true);
            acl.setPublicWriteAccess(true);
            newcounter.setACL(acl);
            /* End Set ACL */
            newcounter.set("title", title);
            newcounter.set("url", url);
            newcounter.set("time", 1);
            newcounter.save(null, {
              success: function(newcounter) {
                var $element = $(document.getElementById(url));
                $element.find('.leancloud-visitors-count').text(newcounter.get('time'));
              },
              error: function(newcounter, error) {
                console.log('Failed to create');
              }
            });
          }
        },
        error: function(error) {
          console.log('Error:' + error.code + " " + error.message);
        }
      });
    }

    $(function() {
      var Counter = AV.Object.extend("Counter");
      if ($('.leancloud_visitors').length == 1) {
        addCount(Counter);
      } else if ($('.post-title-link').length > 1) {
        showTime(Counter);
      }
    });
  </script>



  

  

  
  

  

  

  

</body>
</html>
